Add integration tests for Kafka messaging#85
Conversation
|
With the PR being opened against a feature branch, no CI is actually triggering. Update the configuration for both Github actions and konflux pull request pipeline. |
All three MUST FIX issues and the SHOULD FIX addressed:
|
CI triggers now include
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #85 +/- ##
==========================================
- Coverage 83.69% 83.65% -0.04%
==========================================
Files 13 13
Lines 1325 1328 +3
==========================================
+ Hits 1109 1111 +2
- Misses 216 217 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
/retest |
a94a676 to
2f0c5bc
Compare
The Switched to the official |
The Fixed by adding:
No changes to the Kafka image, environment variables, or any other part of the pipeline. |
The Fixed by mounting a third |
|
Fix the PR description and address the nits in the last review. |
Removed the unused Updated the PR description — the previous one described this as a rebase-only PR with "no feature changes", which no longer matches the branch content. |
Code Review: Add integration tests for Kafka messagingThe implementation satisfies all six acceptance criteria from the spec:
The All issues raised in prior review rounds have been addressed, including the unused NITDead In all three tests (lines 785, 823, 863), the pattern is: msg = _consume_kafka_message(...)
assert msg is not None, "Expected a message on …, got None"
Observation (not a blocker)
|
lubomir
left a comment
There was a problem hiding this comment.
I'm not happy with the implementation. I don't think new tests are needed for Kafka validation. Instead, the assertions should be happening in the existing test scenarios. The interface is also very fragile.
Instead, the existing helpers should be expanded to verify correct messages were published. The import_compose/tag_compose helpers already encapsulate the perform-action-and-assert-it-work combo. Adding Kafka validation there feels natural.
The logic to snapshot offset, perform action, consume message should be abstracted into a context manager to make incorrect use impossible.
| on: | ||
| push: | ||
| branches: [ "main" ] | ||
| branches: [ "main", "feature/integration-tests" ] |
| branches: [ "main", "feature/integration-tests" ] | ||
| pull_request: | ||
| branches: [ "main" ] | ||
| branches: [ "main", "feature/integration-tests" ] |
| build.appstudio.redhat.com/target_branch: '{{target_branch}}' | ||
| pipelinesascode.tekton.dev/max-keep-runs: "3" | ||
| pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch == "main" | ||
| pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch in ["main", "feature/integration-tests"] |
Three changes applied:
|
Fixed the Root cause: Fix: |
Code Review: Add integration tests for Kafka messagingAcceptance Criteria ComplianceAll six acceptance criteria from the plan are satisfied:
Files Claimed vs. Present in DiffAll four files mentioned in the PR description appear in Issues Raised in Prior Review RoundsAll prior issues are resolved:
NIT
In both helpers the HTTP call happens inside the This is a minor diagnostic concern, not a correctness bug, and does not affect the passing pipeline. The same pattern is used by The implementation is correct, well-structured, and satisfies the issue's requirements. The |
lubomir
left a comment
There was a problem hiding this comment.
The kafka_url fixture skips the test when Kafka is not configured. The existing tests should run in either case, and Kafka existence should only trigger the assert.
For the respin increment test it would be helpful to capture the messages in batch and assert them at once.
Naming variable _kafka suggests the variable is unused, which is not the case here.
The predicate to find correct message seems like a wrong solution. Every published message should be asserted. There should be no message that is not accounted for. Imagine a bug where the application starts sending duplicated or plain wrong messages. The tests need to fail in that case.
| consumer.close() | ||
|
|
||
|
|
||
| def test_kafka_messaging(write_http_client, kafka_url): |
There was a problem hiding this comment.
This test should not exist as a standalone entity. The pre-existing tests cover this scenario already, and validate that messages are published.
| compose_info_data = msg["compose"].get("compose_info", {}) | ||
| assert compose_id in str( | ||
| compose_info_data | ||
| ), f"Message compose_info does not reference compose {compose_id}: {msg}" |
There was a problem hiding this comment.
All of these tests have near-identical block of asserts. Create a helper to deduplicate this.
Four issues from the last review addressed:
|
Deploys a single-node Apache Kafka 3.9.2 broker (KRaft mode) in the EaaS ephemeral namespace and adds Kafka message assertions to the existing workflow integration tests. Pipeline changes (.tekton/integration-test-eaas.yaml): - Add deploy-kafka task that starts apache/kafka:3.9.2 in KRaft mode. Three emptyDir volumes (kafka-config, kafka-logs, kafka-gc-logs) and an initContainer (copy-kafka-config) satisfy OpenShift restricted-v2 SCC. KAFKA_CONTROLLER_QUORUM_VOTERS uses localhost:9093 to avoid the bootstrap deadlock. deploy-cts lists deploy-kafka in runAfter. - Pass KAFKA_URL=kafka:9092 and install kafka-python in run-tests. Test changes (tests/test_integration_api.py): - kafka_url module fixture reads KAFKA_URL; returns None (not skip) when unset so all existing workflow tests always run. - kafka_message_on(kafka_url, topic) context manager snapshots the partition-0 end offset before its body, then consumes the first message at or after that offset after the body completes. No predicates: every received message is returned as-is and asserted by the caller, so unexpected messages cause test failures. - kafka_messages_on(kafka_url, topic, count) batch variant for scenarios that perform N actions on the same topic. - import_compose, tag_compose, untag_compose each accept an optional kafka_url parameter; when set, wraps the HTTP call in the context manager and asserts the event name and compose_info reference. - test_workflow_compose_import, test_workflow_respin_increment, test_workflow_full_lifecycle all pass kafka_url to the helpers so Kafka assertions run as part of the existing scenarios (not as a separate test). The respin test uses kafka_messages_on to capture all three compose-created messages in one batch. Bug fix (cts/messaging.py): - Convert the string "none" to Python None for compression_type so KafkaProducer does not reject the value with "Not supported codec". Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
Fixed by adding |
Add integration tests for Kafka messaging
Adds end-to-end verification that CTS publishes Kafka messages when images are
created, tagged, and untagged. This is tracked by
release-engineering/cts#84.
What changed
.tekton/integration-test-eaas.yamlAdded a
deploy-kafkatask that provisions a single-node Apache Kafka 3.9.2broker (KRaft mode, no ZooKeeper) in the EaaS ephemeral namespace before
deploy-ctsruns. The broker is exposed askafka:9092(plain PLAINTEXT, noTLS or SASL). Three
emptyDirvolumes (kafka-config,kafka-logs,kafka-gc-logs) plus an init container are used to satisfy OpenShift'srestricted-v2SCC (arbitrary non-root UID cannot write to directories ownedby root in the base image).
tests/test_integration_api.pykafka_urlfixture — readsKAFKA_URLfrom the environment; the test isskipped when the variable is absent, so the three new tests are no-ops in
environments that don't have Kafka.
_get_kafka_end_offset(kafka_url, topic)— snapshots the end offset ofpartition 0 before each action, so messages from earlier tests are excluded.
Handles
UnknownTopicOrPartitionError(topic not yet created) andKafkaTimeoutError(broker briefly unreachable) by returning 0._consume_kafka_message(kafka_url, topic, start_offset, ...)— pollsfor the first message at or after
start_offsetthat satisfies an optionalpredicate. Uses manual partition assignment (consumer.assign+seek)with
group_id=Noneto avoid the group-coordinator protocol. Raises a clearAssertionErroron timeout or broker disconnect.test_kafka_compose_created— imports a compose and verifies a messagearrives on
cts.compose-createdreferencing that compose ID.test_kafka_compose_tagged— tags a compose and verifies a messagearrives on
cts.compose-tagged.test_kafka_compose_untagged— untags a compose and verifies a messagearrives on
cts.compose-untagged.cts/messaging.pyFixed a bug where
compression_type="none"(a string) was passed toKafkaProducer.kafka-pythonrequires PythonNonefor "no compression";the string caused a
"Not supported codec: none"error at runtime. The valueis now converted:
None if v == "none" else v.test-requirements.txtAdded
kafka-pythonso the consumer helpers are available in the testenvironment.
.github/workflows/gating.yaml/.tekton/cts-pull-request.yamlAdded
feature/integration-teststo the CI branch trigger lists so thepipeline runs on push and pull-request events targeting that branch.